0bdf37
@@ -94,7 +94,9 @@
   private final ListeningExecutorService waitQueueExecutorService;
   // Thread pool for callbacks on completion of execution of a work unit.
   private final ListeningExecutorService executionCompletionExecutorService;
-  private final BlockingQueue<TaskWrapper> preemptionQueue;
+
+  @VisibleForTesting
+  final BlockingQueue<TaskWrapper> preemptionQueue;
   private final boolean enablePreemption;
   private final ThreadPoolExecutor threadPoolExecutor;
   private final AtomicInteger numSlotsAvailable;
@@ -183,6 +185,7 @@
protected SimpleDateFormat initialValue() {
 
   @Override
   public Set<String> getExecutorsStatus() {
+    // TODO Change this method to make the output easier to parse (parse programmatically)
     Set<String> result = new HashSet<>();
     StringBuilder value = new StringBuilder();
     for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) {
@@ -449,11 +452,7 @@
public void killFragment(String fragmentId) {
           if (isDebugEnabled) {
             LOG.debug("Removing {} from preemptionQueue", fragmentId);
           }
-          taskWrapper.setIsInPreemptableQueue(false);
-          preemptionQueue.remove(taskWrapper);
-          if (metrics != null) {
-            metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-          }
+          removeFromPreemptionQueue(taskWrapper);
         }
         taskWrapper.getTaskRunnerCallable().killTask();
       } else {
@@ -463,7 +462,8 @@
public void killFragment(String fragmentId) {
     }
   }
 
-  private void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
+  @VisibleForTesting
+  void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
 
       synchronized (lock) {
         boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
@@ -508,7 +508,7 @@
private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) {
         LOG.debug("Preemption Queue: " + preemptionQueue);
       }
 
-      TaskWrapper pRequest = removeAndGetFromPreemptionQueue();
+      TaskWrapper pRequest = removeAndGetNextFromPreemptionQueue();
 
       // Avoid preempting tasks which are finishable - callback still to be processed.
       if (pRequest != null) {
@@ -548,18 +548,12 @@
private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishab
       if (newFinishableState == true && taskWrapper.isInPreemptionQueue()) {
         LOG.debug("Removing {} from preemption queue because it's state changed to {}",
             taskWrapper.getRequestId(), newFinishableState);
-        preemptionQueue.remove(taskWrapper.getTaskRunnerCallable());
-        if (metrics != null) {
-          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-        }
+        removeFromPreemptionQueue(taskWrapper);
       } else if (newFinishableState == false && !taskWrapper.isInPreemptionQueue() &&
           !taskWrapper.isInWaitQueue()) {
         LOG.debug("Adding {} to preemption queue since finishable state changed to {}",
             taskWrapper.getRequestId(), newFinishableState);
-        preemptionQueue.offer(taskWrapper);
-        if (metrics != null) {
-          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-        }
+        addToPreemptionQueue(taskWrapper);
       }
       lock.notify();
     }
@@ -567,7 +561,12 @@
private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishab
 
   private void addToPreemptionQueue(TaskWrapper taskWrapper) {
     synchronized (lock) {
-      preemptionQueue.add(taskWrapper);
+      boolean added = preemptionQueue.offer(taskWrapper);
+      if (!added) {
+        LOG.warn("Failed to add element {} to preemption queue. Terminating", taskWrapper);
+        Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
+            new IllegalStateException("Preemption queue full. Cannot proceed"));
+      }
       taskWrapper.setIsInPreemptableQueue(true);
       if (metrics != null) {
         metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
@@ -575,10 +574,30 @@
private void addToPreemptionQueue(TaskWrapper taskWrapper) {
     }
   }
 
-  private TaskWrapper removeAndGetFromPreemptionQueue() {
+  /**
+   * Remove the specified taskWrapper from the preemption queue
+   * @param taskWrapper the taskWrapper to be removed
+   * @return true if the element existed in the queue and wasa removed, false otherwise
+   */
+  private boolean removeFromPreemptionQueue(TaskWrapper taskWrapper) {
+    synchronized (lock) {
+      return removeFromPreemptionQueueUnlocked(taskWrapper);
+    }
+  }
+
+  private boolean removeFromPreemptionQueueUnlocked(TaskWrapper taskWrapper) {
+    boolean removed = preemptionQueue.remove(taskWrapper);
+    taskWrapper.setIsInPreemptableQueue(false);
+    if (metrics != null) {
+      metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+    }
+    return removed;
+  }
+
+  private TaskWrapper removeAndGetNextFromPreemptionQueue() {
     TaskWrapper taskWrapper;
     synchronized (lock) {
-       taskWrapper = preemptionQueue.remove();
+       taskWrapper = preemptionQueue.poll();
       if (taskWrapper != null) {
         taskWrapper.setIsInPreemptableQueue(false);
         if (metrics != null) {
@@ -603,6 +622,24 @@
public InternalCompletionListener(TaskWrapper taskWrapper) {
       this.taskWrapper = taskWrapper;
     }
 
+    // By the time either success / failed are called, the task itself knows that it has terminated,
+    // and will ignore subsequent kill requests if they go out.
+
+    // There's a race between removing the current task from the preemption queue and the actual scheduler
+    // attempting to take an element from the preemption queue to make space for another task.
+    // If the current element is removed to make space - that is OK, since the current task is completing and
+    // will end up making space for execution. Any kill message sent out by the scheduler to the task will
+    // be ignored, since the task knows it has completed (otherwise it would not be in this callback).
+    //
+    // If the task is removed from the queue as a result of this callback, and the scheduler happens to
+    // be in the section where it's looking for a preemptible task - the scheuler may end up pulling the
+    // next pre-emptible task and killing it (an extra preemption).
+    // TODO: This potential extra preemption can be avoided by synchronizing the entire tryScheduling block.\
+    // This would essentially synchronize all operations - it would be better to see if there's an
+    // approach where multiple locks could be used to avoid single threaded operation.
+    // - It checks available and preempts (which could be this task)
+    // - Or this task completes making space, and removing the need for preemption
+
     @Override
     public void onSuccess(TaskRunner2Result result) {
       knownTasks.remove(taskWrapper.getRequestId());
@@ -626,15 +663,12 @@
private void updatePreemptionListAndNotify(EndReason reason) {
       // if this task was added to pre-emption list, remove it
       if (enablePreemption) {
         String state = reason == null ? "FAILED" : reason.name();
-        boolean removed = preemptionQueue.remove(taskWrapper);
+        boolean removed = removeFromPreemptionQueueUnlocked(taskWrapper);
         if (removed && isInfoEnabled) {
           TaskRunnerCallable trc = taskWrapper.getTaskRunnerCallable();
           LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(),
               trc.getVertexSpec()) + " request " + state + "! Removed from preemption list.");
         }
-        if (metrics != null) {
-          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-        }
       }
 
       numSlotsAvailable.incrementAndGet();
